Work on less io
authorJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 20 Mar 2018 13:27:43 +0000 (14:27 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Tue, 20 Mar 2018 13:27:43 +0000 (14:27 +0100)
12 files changed:
Debug/src/siri/db/subdir.mk
Release/src/siri/db/subdir.mk
include/siri/db/chunk.h [new file with mode: 0644]
include/siri/db/insert.h
include/siri/db/shard.h
src/siri/db/buffer.c
src/siri/db/chunk.c [new file with mode: 0644]
src/siri/db/insert.c
src/siri/db/series.c
src/siri/db/shard.c
src/siri/db/shards.c
test/test_insert.py

index 395356fd094231f01dc4110aae6355bd372e03b3..d2ee8e0efd14cd4a9fed2974f8587f1cd0197051 100644 (file)
@@ -8,6 +8,7 @@ C_SRCS += \
 ../src/siri/db/aggregate.c \
 ../src/siri/db/auth.c \
 ../src/siri/db/buffer.c \
+../src/siri/db/chunk.c \
 ../src/siri/db/db.c \
 ../src/siri/db/ffile.c \
 ../src/siri/db/fifo.c \
index 493c442ecf2699e111cd5cb4897fffe9b6d4dae7..dfc8f0166a119023f9e54518315397e9dd02b1bb 100644 (file)
@@ -8,6 +8,7 @@ C_SRCS += \
 ../src/siri/db/aggregate.c \
 ../src/siri/db/auth.c \
 ../src/siri/db/buffer.c \
+../src/siri/db/chunk.c \
 ../src/siri/db/db.c \
 ../src/siri/db/ffile.c \
 ../src/siri/db/fifo.c \
diff --git a/include/siri/db/chunk.h b/include/siri/db/chunk.h
new file mode 100644 (file)
index 0000000..e69de29
index 6e289a14c33b73987628c809180461b385503743..92a081c7a910123b4ba11052d29f1005af61f99d 100644 (file)
@@ -24,7 +24,7 @@
 
 typedef enum
 {
-    ERR_EXPECTING_ARRAY=-9,
+    ERR_EXPECTING_ARRAY=-10,
     ERR_EXPECTING_SERIES_NAME,
     ERR_EXPECTING_MAP_OR_ARRAY,
     ERR_EXPECTING_INTEGER_TS,
index 8093dcb66722dffc2e463aff8817ca4db0c666b3..db9194ac84ed5ed3d10a700ea0a956e514ce55b5 100644 (file)
@@ -88,7 +88,7 @@ int siridb_shard_status(char * str, siridb_shard_t * shard);
 int siridb_shard_load(siridb_t * siridb, uint64_t id);
 void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb);
 
-long int siridb_shard_write_points(
+size_t siridb_shard_write_points(
         siridb_t * siridb,
         siridb_series_t * series,
         siridb_shard_t * shard,
index 5e7efa63268355c06185e94a658348f1c67d02ae..dce6b5bb29a36ffdcb605e4f8b9ed727da1c339f 100644 (file)
@@ -60,6 +60,12 @@ int siridb_buffer_write_point(
         uint64_t * ts,
         qp_via_t * val)
 {
+    const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t);
+    char buf[sz];
+
+    memcpy(buf, ts, sizeof(uint64_t));
+    memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t));
+
     return (
         siridb_buffer_write_len(siridb, series) ||
 
@@ -68,11 +74,8 @@ int siridb_buffer_write_point(
                 16 * (series->buffer->len - 1),
                 SEEK_CUR) ||
 
-        /* write time-stamp */
-        fwrite(ts, sizeof(uint64_t), 1, siridb->buffer_fp) != 1 ||
-
-        /* write value */
-        fwrite(val, sizeof(qp_via_t), 1, siridb->buffer_fp) != 1) ? EOF : 0;
+        /* write time-stamp and value */
+        fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0;
 }
 
 /*
@@ -242,6 +245,9 @@ int siridb_buffer_load(siridb_t * siridb)
  */
 static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
 {
+    const size_t sz = sizeof(uint32_t) + sizeof(size_t);
+    char buf[sz];
+
     series->bf_offset = (long int) slist_pop(siridb->empty_buffers);
 
     /* jump to the correct buffer position */
@@ -251,19 +257,11 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
         return -1;
     }
 
-    /* write series ID to buffer */
-    if (fwrite(&series->id, sizeof(uint32_t), 1, siridb->buffer_fp) != 1)
-    {
-        ERR_FILE
-        return -1;
-    }
+    memcpy(buf, &series->id, sizeof(uint32_t));
+    memcpy(buf + sizeof(uint32_t), &series->buffer->len, sizeof(size_t));
 
-    /* write 0 length */
-    if (fwrite(
-            &series->buffer->len,
-            sizeof(size_t),
-            1,
-            siridb->buffer_fp) != 1)
+    /* write series ID and 0 length to buffer */
+    if (fwrite(buf, sz, 1, siridb->buffer_fp) != 1)
     {
         ERR_FILE
         return -1;
diff --git a/src/siri/db/chunk.c b/src/siri/db/chunk.c
new file mode 100644 (file)
index 0000000..e69de29
index 111533086fd57dff01c68be4fb5b08eaf48dfd7b..d6d18710d7d06c593da67fea7720b4cc4939ca65 100644 (file)
@@ -18,6 +18,7 @@
 #include <siri/db/points.h>
 #include <siri/db/replicate.h>
 #include <siri/db/series.h>
+#include <siri/db/servers.h>
 #include <siri/err.h>
 #include <siri/net/promises.h>
 #include <siri/net/protocol.h>
@@ -194,6 +195,7 @@ ssize_t siridb_insert_assign_pools(
     {
         rc = ERR_EXPECTING_MAP_OR_ARRAY;
     }
+
     return (siri_err) ? ERR_MEM_ALLOC : rc;
 }
 
index 5ae9d0c3437992446960aeb10955f865f896444f..8aca61cdb2a98cecc15191f4d28e8f5df4c3d621 100644 (file)
@@ -872,7 +872,7 @@ int siridb_series_optimize_shard(
 
     end += new_idx;
 
-    long int pos;
+    size_t pos;
     uint16_t chunk_sz;
     uint_fast32_t num_chunks, pstart, pend, diff;
     siridb_shard_get_points_cb get_points_cb = \
@@ -919,7 +919,7 @@ int siridb_series_optimize_shard(
                 pstart,
                 pend,
                 siri.optimize->idx_fp,
-                &cinfo)) == EOF)
+                &cinfo)) == 0)
         {
             log_critical(
                     "Cannot write points to shard id '%" PRIu64 "'",
index b5826ad053b479a6a423b624b4d5c544c6a9daa5..cd76ffd05ee1f0f2f4c3eb89c5109c346e3f670f 100644 (file)
@@ -129,7 +129,7 @@ static int SHARD_load_idx(
         int is_ts64);
 static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard);
 static int SHARD_grow(siridb_shard_t * shard);
-static int SHARD_write_header(
+static size_t SHARD_write_header(
         siridb_t * siridb,
         siridb_series_t * series,
         siridb_points_t * points,
@@ -457,9 +457,9 @@ int siridb_shard_status(char * str, siridb_shard_t * shard)
  * Writes an index and points to a shard. The return value is the position
  * where the points start in the shard file.
  *
- * If an error has occurred, EOF will be returned and a SIGNAL will be raised.
+ * If an error has occurred, 0 will be returned and a SIGNAL will be raised.
  */
-long int siridb_shard_write_points(
+size_t siridb_shard_write_points(
         siridb_t * siridb,
         siridb_series_t * series,
         siridb_shard_t * shard,
@@ -475,8 +475,7 @@ long int siridb_shard_write_points(
     unsigned char * cdata = NULL;
 
     uint_fast32_t i;
-    long int pos = EOF;
-    int header_sz;
+    size_t pos, header_sz;
 
     if (shard->fp->fp == NULL)
     {
@@ -484,7 +483,7 @@ long int siridb_shard_write_points(
         {
             ERR_FILE
             log_critical("Cannot open file '%s'", shard->fn);
-            return EOF;
+            return 0;
         }
     }
     fp = shard->fp->fp;
@@ -494,8 +493,9 @@ long int siridb_shard_write_points(
         cdata = siridb_points_zip(points, start, end, cinfo, &dsize);
         if (cdata == NULL)
         {
+            ERR_ALLOC
             log_critical("Memory allocation error while compressing points");
-            return -1;
+            return 0;
         }
     }
     else if (series->tp == TP_STRING)
@@ -504,8 +504,9 @@ long int siridb_shard_write_points(
         cdata = siridb_points_raw_string(points, start, end, cinfo, &dsize);
         if (cdata == NULL)
         {
+            ERR_ALLOC
             log_critical("Memory allocation error while compressing points");
-            return -1;
+            return 0;
         }
     }
     else
@@ -523,7 +524,7 @@ long int siridb_shard_write_points(
     if (fseeko(fp, shard->len, SEEK_SET))
     {
         log_critical("Seek error in: '%s'", shard->fn);
-        return -1;
+        return 0;
     }
 
     if (idx_fp == NULL || (shard->flags & SIRIDB_SHARD_HAS_NEW_VALUES))
@@ -552,50 +553,47 @@ long int siridb_shard_write_points(
         pos = shard->len;
     }
 
-    if (header_sz < 0)
+    if (!header_sz)
     {
         ERR_FILE
         log_critical(
                 "Cannot write index header for shard id %" PRIu64,
                 shard->id);
         free(cdata);
-        return EOF;
+        return 0;
     }
 
-    if (cdata != NULL)
+    if (cdata == NULL)
     {
-        long int rc = fwrite(cdata, dsize, 1, fp);
-        free(cdata);
-        if (rc != 1)
+        size_t p = 0;
+        size_t ts_sz = siridb->time->ts_sz;
+        cdata = (unsigned char *) malloc(dsize);
+        if (cdata == NULL)
         {
-            ERR_FILE
-            log_critical("Cannot write points to file '%s'", shard->fn);
-            return EOF;
+            ERR_ALLOC
+            log_critical("Memory allocation error while compressing points");
+            return 0;
         }
-    }
-    else
-    {
+
         for (i = start; i < end; i++)
         {
-            if (fwrite(&points->data[i].ts, siridb->time->ts_sz, 1, fp) != 1 ||
-                fwrite(&points->data[i].val, 8, 1, fp) != 1)
-            {
-                ERR_FILE
-                log_critical("Cannot write points to file '%s'", shard->fn);
-                return EOF;
-            }
+            memcpy(cdata + p, &points->data[i].ts, ts_sz);
+            p += ts_sz;
+            memcpy(cdata + p, &points->data[i].val, 8);
+            p += 8;
         }
     }
 
-    if (fflush(fp))
+    long int rc = fwrite(cdata, dsize, 1, fp);
+    free(cdata);
+    if (rc != 1)
     {
         ERR_FILE
-        log_critical("Cannot write flush file '%s'", shard->fn);
-        return EOF;
+        log_critical("Cannot write points to file '%s'", shard->fn);
+        return 0;
     }
 
     shard->len = pos + dsize;
-
     return pos;
 }
 
@@ -1847,10 +1845,10 @@ static inline int SHARD_init_fn(siridb_t * siridb, siridb_shard_t * shard)
  * Write a header for a chunk of points. The header can be written to argument
  * fp which should be a pointer to the index, or the shard file.
  *
- * In case of an error the function return EOF, otherwise the size which is
+ * In case of an error the function returns 0, otherwise the size which is
  * written.
  */
-static int SHARD_write_header(
+static size_t SHARD_write_header(
         siridb_t * siridb,
         siridb_series_t * series,
         siridb_points_t * points,
@@ -1860,12 +1858,9 @@ static int SHARD_write_header(
         FILE * fp)
 {
     uint16_t len = end - start;
-    int size = EOF;
-
-    if (fwrite(&series->id, sizeof(uint32_t), 1, fp) != 1)
-    {
-        return EOF;
-    }
+    size_t size = sizeof(uint32_t);
+    char buf[24];
+    memcpy(buf, &series->id, sizeof(uint32_t));
 
     switch (siridb->time->ts_sz)
     {
@@ -1873,22 +1868,18 @@ static int SHARD_write_header(
         {
             uint32_t start_ts = (uint32_t) points->data[start].ts;
             uint32_t end_ts = (uint32_t) points->data[end - 1].ts;
-            if (fwrite(&start_ts, sizeof(uint32_t), 1, fp) != 1 ||
-                fwrite(&end_ts, sizeof(uint32_t), 1, fp) != 1)
-            {
-                return EOF;
-            }
+            memcpy(buf + size, &start_ts, sizeof(uint32_t));
+            size += sizeof(uint32_t);
+            memcpy(buf + size, &end_ts, sizeof(uint32_t));
+            size += sizeof(uint32_t);
         }
-        size = IDX32_SZ;
         break;
 
     case sizeof(uint64_t):
-        if (fwrite(&points->data[start].ts, sizeof(uint64_t), 1, fp) != 1 ||
-            fwrite(&points->data[end - 1].ts, sizeof(uint64_t), 1, fp) != 1)
-        {
-            return EOF;
-        }
-        size = IDX64_SZ;
+        memcpy(buf + size, &points->data[start].ts, sizeof(uint64_t));
+        size += sizeof(uint64_t);
+        memcpy(buf + size, &points->data[end - 1].ts, sizeof(uint64_t));
+        size += sizeof(uint64_t);
         break;
 
     default:
@@ -1896,19 +1887,18 @@ static int SHARD_write_header(
         break;
     }
 
+    memcpy(buf + size, &len, sizeof(uint16_t));
+    size += sizeof(uint16_t);
 
-    if (fwrite(&len, sizeof(uint16_t), 1, fp) != 1)
+    if (cinfo != NULL)
     {
-        return EOF;
+        memcpy(buf + size, cinfo, sizeof(uint16_t));
+        size += sizeof(uint16_t);
     }
 
-    if (cinfo != NULL)
+    if (fwrite(buf, size, 1, fp) != 1)
     {
-        size += sizeof(uint16_t);
-        if (fwrite(cinfo, sizeof(uint16_t), 1, fp) != 1)
-        {
-            return EOF;
-        }
+        return 0;
     }
 
     return size;
index bbff50cfe403057a8c8dc8930b0ace3a54c019ba..a52480cbf6b65caad7798ea8d0a299bf423b6157 100644 (file)
@@ -137,8 +137,7 @@ int siridb_shards_add_points(
     uint_fast32_t start, end, num_chunks, pstart, pend;
     uint16_t chunk_sz;
     uint16_t cinfo = 0;
-    size_t size;
-    long int pos;
+    size_t size, pos;
 
     for (end = 0; end < points->len;)
     {
@@ -188,7 +187,7 @@ int siridb_shards_add_points(
                         pstart,
                         pend,
                         NULL,
-                        &cinfo)) < 0)
+                        &cinfo)) == 0)
                 {
                     log_critical(
                             "Could not write points to shard id %" PRIu64,
index 928f826921039c8d00668f9e7e09a03f445b6ce6..c4eedeba5c6048e04c536401a7800c155afc1421 100644 (file)
@@ -50,7 +50,7 @@ class TestInsert(TestBase):
             await client.insert_some_series(series, timeout=timeout, points=self.GEN_POINTS)
             await asyncio.sleep(1.0)
 
-    @default_test_setup(2, time_precision=TIME_PRECISION, compression=True)
+    @default_test_setup(2, time_precision=TIME_PRECISION, compression=False)
     async def run(self):
         await self.client0.connect()